package com.storm.demo.demo;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.storm.demo.BasicTopology;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.shade.org.apache.commons.collections.CollectionUtils;
import org.apache.storm.shade.org.apache.commons.collections.MapUtils;
import org.apache.storm.topology.BoltDeclarer;
import org.apache.storm.topology.TopologyBuilder;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class DemoTopology extends BasicTopology {
    @Override
    protected TopologyBuilder createTopology(Properties props) {
        TopologyBuilder builder = new TopologyBuilder();

        List<KafkaSpout> kafkaSpouts = createKafkaSpouts(props);
        Preconditions.checkArgument(CollectionUtils.isNotEmpty(kafkaSpouts),"Kafka接收队列未配置.");
        //实际计算逻辑
        DemoCalcBolt calculateBolt = new DemoCalcBolt(props.getProperty(BasicTopology.PROPS_PATH));

        int spoutParallelism = MapUtils.getInteger(props,"topo.spout.parallel");
        int calculateParallelism = MapUtils.getInteger(props,"topo.calc.parallel");
        for(int i = 0;i < kafkaSpouts.size();i ++){
            KafkaSpout kafkaSpout = kafkaSpouts.get(i);
            builder.setSpout("kafkaSpout-" + (i + 1),kafkaSpout,spoutParallelism);
        }
        BoltDeclarer boltDeclarer = builder.setBolt("calcBolt",calculateBolt,calculateParallelism);
        for(int i = 0;i < kafkaSpouts.size();i++){
            boltDeclarer.localOrShuffleGrouping("kafkaSpout-" + (i + 1));
        }
        return builder;
    }

    public List<KafkaSpout> createKafkaSpouts(Properties props){
        List<KafkaSpout> kafkaSpouts = Lists.newArrayList();
        for(int i = 1;i < 100; i++){
            KafkaSpout kafkaSpout = createKafkaSpoutIndex(props,i);
            if(kafkaSpout != null){
                kafkaSpouts.add(kafkaSpout);
            }
        }
        return kafkaSpouts;
    }

    public KafkaSpout createKafkaSpoutIndex(Properties props,int index){
        String topologyName = props.getProperty("topo.name");
        String startTimne = props.getProperty("kafka.start.time" + "." + index);
        String endTimne = props.getProperty("kafka.end.time" + "." + index);
        if(StringUtils.isEmpty(startTimne) || StringUtils.isEmpty(endTimne)){
            return null;
        }

        String topic = props.getProperty("kafka.topic" + "." + index);
        String brokerZkStr = props.getProperty("kafka.brokerZkStr");
        String brokerZkPath = props.getProperty("kafka.brokerZkPath");
        List<String> zkSevers = Arrays.asList(StringUtils.split(props.getProperty("kafka.offset.zkServers"),","));
        Integer zkPort = MapUtils.getInteger(props,"kafka.offset.zkPort");
        String zkRoot = props.getProperty("kafka.offset.zkRoot");
        String id = StringUtils.join(topologyName,"-",topic);

        BrokerHosts kafkaBrokerZk = new ZkHosts(brokerZkStr,brokerZkPath);
        SpoutConfig spoutConfig = new SpoutConfig(kafkaBrokerZk,topic,zkRoot,id);
        spoutConfig.zkServers = zkSevers;
        spoutConfig.zkPort = zkPort;
        spoutConfig.zkRoot = zkRoot;
        spoutConfig.stateUpdateIntervalMs = 30000;
        return new DemoKafkaSpout(spoutConfig,startTimne,endTimne);
    }

    /**
     * 集群模式的驱动方式
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception{
        DemoTopology notifyTopology = new DemoTopology();
        notifyTopology.run(args);
    }

}
